#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "adt.h"
#include "ynet_rpc.h"
#include "cluster.h"
#include "core.h"
#include "net_global.h"
#include "locator_rpc.h"
#include "timer.h"
#include "md_map.h"
#include "dbg.h"

typedef struct {
        chkid_t chkid;
        nid_t nid;
        uint32_t magic;
        time_t last_retry;
        time_t loading;
} entry_t;

static mcache_t *cache;

STATIC int __md_map_update_maping(const chkid_t *chkid, const nid_t *nid);

static int __entry_new(entry_t **_ent, const chkid_t *chkid, const nid_t *nid)
{
        int ret;
        entry_t *ent;

        ret = ymalloc((void**)&ent, sizeof(entry_t));
        if (unlikely(ret)) 
                GOTO(err_ret, ret);

        ent->chkid = *chkid;
        ent->nid = *nid;
        ent->last_retry = 0;
        ent->loading = 0;
        ent->magic = -1;
        *_ent = ent;

        return 0;
err_ret:
        return ret;
}

static void __entry_free(entry_t *ent)
{
        yfree((void **)&ent);
}

STATIC void __md_map_release(mcache_entry_t *cent)
{
        mcache_release(cent);
}

STATIC int __md_map_check(const chkid_t *chkid, nid_t *nid, time_t *ltime)
{
        int ret;

        ret = network_connect(nid, ltime, 1, 1);
        if (unlikely(ret)) {
                DINFO("chunk "CHKID_FORMAT"\n", CHKID_ARG(chkid));
                ret = locator_rpc_lookup(chkid, nid, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = network_connect(nid, ltime, 0, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __md_map_new(const chkid_t *chkid, const nid_t *nid)
{
        int ret;
        entry_t *ent;

        ret = __entry_new(&ent, chkid, nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DBUG("chunk "CHKID_FORMAT" @ %s\n", CHKID_ARG(chkid),
             network_rname(nid));

        ret = mcache_insert(cache, chkid, ent);
        if (unlikely(ret)) {
                __entry_free(ent);
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __md_map_update(mcache_entry_t *cent, const nid_t *nid)
{
        int ret;
        entry_t *ent;
        time_t ltime;

        ret = network_connect(nid, &ltime, 0, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent = cent->value;
        ent->nid = *nid;
        ent->magic = ltime;

        mcache_unlock(cent);

        return 0;
err_ret:
        return ret;
}

STATIC void *__md_map_load__(void *arg)
{
        int ret, res;
        chkid_t *chkid;
        entry_t *ent;
        mcache_entry_t *cent;
        nid_t nid;
        time_t ltime;

        chkid = arg;

        ret = mcache_get(cache, chkid, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);
                
        ret = mcache_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;
        nid = ent->nid;

        mcache_unlock(cent);

        res = __md_map_check(chkid, &nid, &ltime);

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;
        ent->loading = 0;
        if (res == 0) {
                ent->nid = nid;
                ent->magic = ltime;
        }

        mcache_unlock(cent);
        mcache_release(cent);

        yfree((void **)&arg);
        pthread_exit(NULL);
err_release:
        mcache_release(cent);
err_ret:
        yfree((void **)&arg);
        pthread_exit(NULL);
}

STATIC int __md_map_load(mcache_entry_t *cent, const chkid_t *_chkid)
{
        int ret;
        time_t now, tmo;
        entry_t *ent;
        chkid_t *chkid;

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent = cent->value;
        now = gettime();
        if (now - ent->last_retry < 0) {
                ent->last_retry = 0;
        }

        tmo = now - ent->last_retry;
        if (tmo < 1) {
                ret = EAGAIN;
                GOTO(err_lock, ret);
        }

        if (now - ent->loading < 1) {
                ret = EAGAIN;
                GOTO(err_lock, ret);
        }

        ent->last_retry = now;
        ent->loading = now;
        ent->magic = -1;

        mcache_unlock(cent);

        ret = ymalloc((void**)&chkid, sizeof(*chkid));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *chkid = *_chkid;

        ret = sy_thread_create2(__md_map_load__, chkid, __FUNCTION__);
        if (unlikely(ret))
                UNIMPLEMENTED(__WARN__);

        return 0;
err_lock:
        mcache_unlock(cent);
err_ret:
        return ret;
}

#if 0
STATIC int __md_map_check_reset(mcache_entry_t *cent)
{
        int ret;
        entry_t *ent;
        nid_t newid;
        time_t ltime;

        ent = cent->value;
        ret = network_connect(&ent->nid, &ltime, 0, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        if (ent->magic == ltime) {
                DBUG(CHKID_FORMAT" useable\n", CHKID_ARG(&ent->chkid));
                return 0;
        } else {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (ent->magic != ltime) {
                DINFO(CHKID_FORMAT" reset\n", CHKID_ARG(&ent->chkid));
                
                ret = locator_rpc_lookup(&ent->chkid, &newid, 0);
                if (unlikely(ret)) {
                        GOTO(err_lock, ret);
                }
        
                ent->magic = ltime;
                ent->nid = newid;

                ret = __md_map_update_maping(&ent->chkid, &newid);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        }
                
        mcache_unlock(cent);

        return 0;
err_lock:
        mcache_unlock(cent);
err_ret:
        return ret;
}
#endif

STATIC int __md_map_getsrv(mcache_entry_t *cent, nid_t *nid)
{
        int ret;
        entry_t *ent;
        time_t ltime;

#if 0
        ret = __md_map_check_reset(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif
        
        ret = mcache_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent = cent->value;

        ret = network_connect(&ent->nid, &ltime, 0, 0);
        if (unlikely(ret)) {
                GOTO(err_lock, ret);
        }
        
        if (ent->magic != ltime) {
                DBUG(CHKID_FORMAT" useable\n", CHKID_ARG(&ent->chkid));
                ret = EAGAIN;
                GOTO(err_lock, ret);
        }
        
        *nid = ent->nid;

        mcache_unlock(cent);

        return 0;
err_lock:
        mcache_unlock(cent);
err_ret:
        return ret;
}

STATIC int __md_map_lookup(const chkid_t *chkid, nid_t *nid)
{
        int ret;
        char buf[MAX_NAME_LEN];

        ret = maping_get(ID2NID, id2str(chkid), buf, NULL);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        DINFO("lookup "CHKID_FORMAT"\n", CHKID_ARG(chkid));
                        ret = locator_rpc_lookup(chkid, nid, 0);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
        
                        YASSERT(nid->id > 0);

                        nid2str(buf, nid);
                        ret = maping_set(ID2NID, id2str(chkid), buf);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                } else {
                        GOTO(err_ret, ret);
                }
        } else {
                DBUG("get "CHKID_FORMAT" --> %s\n", CHKID_ARG(chkid), buf);
                str2nid(nid, buf);
                YASSERT(nid->id > 0);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __md_map_update_maping(const chkid_t *chkid, const nid_t *nid)
{
        int ret;
        char buf[MAX_NAME_LEN];

        YASSERT(nid->id > 0);
        
        nid2str(buf, nid);
        ret = maping_set(ID2NID, id2str(chkid), buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DBUG("set "CHKID_FORMAT" --> %s\n",
             CHKID_ARG(chkid), buf);
        
        return 0;
err_ret:
        return ret;
}

static inline int __md_map_drop(const chkid_t *chkid)
{
        int ret;

        ret = maping_drop(ID2NID, id2str(chkid));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DBUG("drop "CHKID_FORMAT"\n", CHKID_ARG(chkid));

        return 0;
err_ret:
        return ret;
}

int md_map_getsrv(const chkid_t *chkid, nid_t *nid)
{
        int ret, retry = 0;
        mcache_entry_t *cent;

        ANALYSIS_BEGIN(0);

        if (chkid_isroot(chkid)) {
                *nid = *net_getadmin();
                return 0;
        }

        YASSERT(chkid->type == __POOL_CHUNK__
                || chkid->type == __VOLUME_CHUNK__);

        YASSERT(chkid->idx == 0);

retry:
        ret = mcache_get(cache, chkid, &cent);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        DBUG("chunk "CHKID_FORMAT" get\n", CHKID_ARG(chkid));

                        ret = __md_map_lookup(chkid, nid);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        ret = __md_map_new(chkid, nid);
                        if (unlikely(ret)) {
                                if (ret == EEXIST)
                                        goto retry;
                                else
                                        GOTO(err_ret, ret);
                        }

                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

retry1:
        ret = __md_map_getsrv(cent, nid);
        if (unlikely(ret)) {
                __md_map_drop(chkid);
                __md_map_load(cent, chkid);
                if (_errno(ret) == EAGAIN) {
                        if (ng.daemon) {
                                GOTO(err_release, ret);
                        } else {
                                USLEEP_RETRY(err_release, ret, retry1, retry,
                                             3, (100 * 1000));
                        }
                } else {
                        GOTO(err_release, ret);
                }
        }

        __md_map_release(cent);

        ANALYSIS_END(0, IO_WARN * (gloconf.rpc_timeout / 2), NULL);
        
        return 0;
err_release:
        __md_map_release(cent);
err_ret:
        ANALYSIS_END(0, IO_WARN * (gloconf.rpc_timeout / 2), NULL);
        return ret;
}

int md_map_update(const chkid_t *chkid, const nid_t *nid)
{
        int ret;
        mcache_entry_t *cent;

        if (chkid->type != __VOLUME_CHUNK__ && chkid->type != __POOL_CHUNK__) {
                goto out;
        }

        YASSERT(chkid->idx == 0);

retry:
        ret = mcache_get(cache, chkid, &cent);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        DBUG("chunk "CHKID_FORMAT" get\n", CHKID_ARG(chkid));
                        ret = __md_map_new(chkid, nid);
                        if (unlikely(ret)) {
                                if (ret == EEXIST)
                                        goto retry;
                                else
                                        GOTO(err_ret, ret);
                        }
                } else
                        GOTO(err_ret, ret);
        } else {
                ret = __md_map_update(cent, nid);
                if (unlikely(ret))
                        GOTO(err_release, ret);
        
                __md_map_release(cent);
        }

        ret = __md_map_update_maping(chkid, nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
out:
        return 0;
err_release:
        __md_map_release(cent);
err_ret:
        return ret;
}

static int __md_map_update1(mcache_entry_t *cent, const chkid_t *chkid)
{
        int ret;
        nid_t newid;

        DINFO("chunk "CHKID_FORMAT"\n", CHKID_ARG(chkid));
        
        ret = locator_rpc_lookup(chkid, &newid, 0);
        if (unlikely(ret)) {
                mcache_drop(cent);
                GOTO(err_ret, ret);
        }

        ret = __md_map_update(cent, &newid);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ret = __md_map_update_maping(chkid, &newid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int md_map_drop(const chkid_t *chkid, const nid_t *nid)
{
        int ret;
        mcache_entry_t *cent;
        entry_t *ent;

        if (chkid->type != __VOLUME_CHUNK__ && chkid->type != __POOL_CHUNK__) {
                goto out;
        }

        __md_map_drop(chkid);

        ret = mcache_get(cache, chkid, &cent);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        //XXX
                } else
                        GOTO(err_ret, ret);
        } else {
                ent = cent->value;

                if (nid_cmp(&ent->nid, nid) == 0) {
                        ret = __md_map_update1(cent, chkid);
                        if (unlikely(ret))
                                GOTO(err_release, ret);
                }

                __md_map_release(cent);
        }

out:
        return 0;
err_release:
        __md_map_release(cent);
err_ret:
        return ret;
}

static int __cmp(const void *s1, const void *s2)
{
        const chkid_t *id1 = s1;
        const chkid_t *id2 = &((entry_t *)s2)->chkid;

        DBUG("cmp "CHKID_FORMAT" : "CHKID_FORMAT"\n",
             CHKID_ARG(id1), CHKID_ARG(id2));

        return !chkid_cmp(id1, id2);
}

static uint32_t __hash(const void *key)
{
        const chkid_t *id = key;

        return id->id;
}

static uint32_t __core_hash(const void *key)
{
        const chkid_t *id = key;

        return core_hash(id);
}

static int __drop(void *value, mcache_entry_t *cent, int recycle)
{
        entry_t *ent;

        (void) cent;
        ent = (entry_t *)value;

        if (ent) {
                if (recycle) {
                        entry_t *entry = value;
                        DINFO("recycle "CHKID_FORMAT"\n", &entry->chkid);
                }
                yfree((void **)&ent);
        }

        return 0;
}

static  worker_handler_t __md_map__;

STATIC int __md_map_cleanup(void *arg)
{
        int ret;

        (void) arg;

        ret = maping_cleanup(ID2NID);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ret = timer1_settime(&__md_map__, USEC_PER_DAY);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        return 0;
err_ret:
        return ret;
}

int md_map_init()
{
        int ret;

        ret = mcache_init(&cache, MCACHE_SIZE, __cmp, __hash, __core_hash,
                          __drop, 0, "md_map");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (!ng.daemon) {
                goto out;
        }

        ret = locator_rpc_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = timer1_create(&__md_map__, "md_map_cleanup", __md_map_cleanup, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = timer1_settime(&__md_map__, USEC_PER_DAY);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
        
out:
        return 0;
err_ret:
        return ret;
}
